
##---------------------------------------------------------------
## Required libraries
##---------------------------------------------------------------

#----- Parallel computing
library(doParallel)

#----- Make clusters based on the number of CPU cores
cl <- makeCluster(detectCores())
registerDoParallel(cl)
getDoParWorkers()

#----- Support parallel excution
library(foreach)

#----- Stan (Hybrid Monte Carlo) for R
library(rstan)


##---------------------------------------------------------------
## Extract MCMC samples from Stan outputs
##---------------------------------------------------------------

#----- Load MCMC samples and Data
load("MCMCsample.RData")
load("Master.RData")

#------ Set Treatment (TRT), Outcome (OUT), Mediators (M) and Covariates (X)
Data <- Master
OUT <- Data$PM.2.5
TRT <- Data$SO2.SC
M <- cbind(Data$SO2_Annual, Data$NOx_Annual, Data$CO2_Annual)
X <- cbind(Data$S_n_CR, Data$NumNOxControls, Data$Heat_Input/100000, Data$Barometric_Pressure, Data$Temperature,  Data$PctCapacity, Data$sulfur_Content, Data$Phase2_Indicator, Data$Operating_Time/1000)


dim.cov <- dim(X)[2] #<--------- Num. of Covariates

#------ Variables by treatments
x0 <- X[which(TRT==0),]
x1 <- X[which(TRT==1),]

y0 <- OUT[which(TRT==0)]
y1 <- OUT[which(TRT==1)]

m0 <- log(M[which(TRT==0),])
m1 <- log(M[which(TRT==1),])

n0 <- dim(x0)[1]
n1 <- dim(x1)[1]


#----- Extract MCMC samples from each marginal distribution

data.y0 <- extract(fit.y0)
data.y1 <- extract(fit.y1)
data.m10 <- extract(fit.m1_0)
data.m11 <- extract(fit.m1_1)
data.m20 <- extract(fit.m2_0)
data.m21 <- extract(fit.m2_1)
data.m30 <- extract(fit.m3_0)
data.m31 <- extract(fit.m3_1)

#----- Parameters of the models under Z=1

gamma10 <- data.y1$gamma0
gamma11 <- data.y1$gamma1
w1 <- data.y1$W
beta1_10 <- data.m11$gamma0
beta1_11 <- data.m11$gamma1
beta2_10 <- data.m21$gamma0
beta2_11 <- data.m21$gamma1
beta3_10 <- data.m31$gamma0
beta3_11 <- data.m31$gamma1
s1_1 <- data.m11$W
s2_1 <- data.m21$W
s3_1 <- data.m31$W
psi1 <- data.y1$psi
psi1_1 <- data.m11$psi
psi2_1 <- data.m21$psi
psi3_1 <- data.m31$psi


#----- Parameters of the models under Z=0

gamma00 <- data.y0$gamma0
gamma01 <- data.y0$gamma1
w0 <- data.y0$W
beta1_00 <- data.m10$gamma0
beta1_01 <- data.m10$gamma1
beta2_00 <- data.m20$gamma0
beta2_01 <- data.m20$gamma1
beta3_00 <- data.m30$gamma0
beta3_01 <- data.m30$gamma1
s1_0 <- data.m10$W
s2_0 <- data.m20$W
s3_0 <- data.m30$W
psi0 <- data.y0$psi
psi1_0 <- data.m10$psi
psi2_0 <- data.m20$psi
psi3_0 <- data.m30$psi

#----- Setting Thinning and Burn-in's
Thin <- 5
Burn0 <- 18000  # Extra burn-in periods for Models under Z=0
Burn1 <- 18000   # Extra burn-in periods for Models under Z=1


#----- Set the number of post-processing steps, N
n.iter <- 400

##---------------------------------------------------------------
## 'Main' function on each cluster (parallel)
##---------------------------------------------------------------


main <- function(temp){
    
    
    #----- Require multivariate normal distribuion on each cluster
    library(mnormt)
    
    joint0 <- cbind(y0, m0);  joint1 <- cbind(y1, m1)
    #----- Estimating condtional rank correlations under normality
    fit0 <- lm(joint0 ~ x0)
    fit1 <- lm(joint1 ~ x1)
    cov0 <- 1 / n0 * t(joint0 - predict(fit0, newdata=data.frame(x0))) %*% (joint0 - predict(fit0, newdata=data.frame(x0)))
    cov1 <- 1 / n1 * t(joint1 - predict(fit1, newdata=data.frame(x1))) %*% (joint1 - predict(fit1, newdata=data.frame(x1)))
    
    #----- Under Normality, the following relatioship between
    #----- Spearman and Pearson correlations holds (Kruskal, 1958)
    cor0 <- 6 / pi * asin(matrix(apply(expand.grid(1:4,1:4), 1, function(x) cov0[x[1],x[2]] / sqrt(cov0[x[1],x[1]] * cov0[x[2],x[2]])),4,4) / 2)
    cor1 <- 6 / pi * asin(matrix(apply(expand.grid(1:4,1:4), 1, function(x) cov1[x[1],x[2]] / sqrt(cov1[x[1],x[1]] * cov1[x[2],x[2]])),4,4) / 2)


    #----- Sensitivity parameters
    sens1 <- function(j,k) 2 * min(abs(cor0[j+1,k+1]), abs(cor1[j+1,k+1])) / abs(cor0[j+1,k+1] + cor1[j+1,k+1])
    rm <- function(j,k,rho1) (cor0[j+1,k+1] + cor1[j+1,k+1]) / 2 * rho1
    
    sens2 <- function(k) 2 * min(abs(cor0[1,k+1]), abs(cor1[1,k+1])) / abs(cor0[1,k+1] + cor1[1,k+1])
    ry <- function(k,rho2) (cor0[1,k+1] + cor1[1,k+1]) / 2 * rho2
    
    rho1 <- runif(1, 0, min(apply(expand.grid(1:3,1:3),1, function(x) sens1(x[1],x[2]))))
    rho2 <- runif(1, 0, min(sens2(1),sens2(2), sens2(3)))
    
    #----- Index of iterations (on parallel)
    j <- temp

    #----- The number of covariates samples (with replacement) for the empirical distribution
    size <- 10000
    s.covariate <- data.frame(X[sample(seq(1,dim(X)[1]), size = size, replace = TRUE),])  #----- Covariates samples

    #----- Index of posterior samples
    index0 <- Thin * j + Burn0; index1 <- Thin * j + Burn1
 
    #----- Construct the correlation matrix
    cor.part <- matrix(apply(expand.grid(1:3,1:3), 1, function(x) rm(x[1], x[2], rho1)), 3, 3)
    Cor01 <- 2 * sin(pi * (1/6) * rbind(cbind(cor0[2:4,2:4], cor.part), cbind(cor.part, cor1[2:4,2:4])))
    cor.y1m1 <- 2 * sin(pi * (1/6) * cor1) # Corr. of the observed data for Z=1
    cor.y0m0 <- 2 * sin(pi * (1/6) * cor0) # Corr. of the observed data for Z=0
  
    #----- Weighting parameters of mixtures
    pi.m10 <- pmax(psi1_0[index0,], 0)
    pi.m20 <- pmax(psi2_0[index0,], 0)
    pi.m30 <- pmax(psi3_0[index0,], 0)
    pi.m11 <- pmax(psi1_1[index1,], 0)
    pi.m21 <- pmax(psi2_1[index1,], 0)
    pi.m31 <- pmax(psi3_1[index1,], 0)

    #---- Sampling mediators conditional on covariates
    M0M1 <- function(k){
        F <- pnorm(rmnorm(1, mean=rep(0,6), Cor01),0,1)
        X.temp <- s.covariate[k,]
        for(i in 1:3){
            for(k in 0:1){
                eval(parse(text=paste("clus.m",i,k," <- which(rmultinom(1, 1, pi.m",i,k,")==1)", sep="")))
            }
        }
    
        s.m10<-qnorm(F[1], mean = beta1_00[index0,clus.m10] + beta1_01[index0,]%*%t(X.temp), sd = 1 / sqrt(s1_0[index0,clus.m10]))
        s.m20<-qnorm(F[2], mean = beta2_00[index0,clus.m20] + beta2_01[index0,]%*%t(X.temp), sd = 1 / sqrt(s2_0[index0,clus.m20]))
        s.m30<-qnorm(F[3], mean = beta3_00[index0,clus.m30] + beta3_01[index0,]%*%t(X.temp), sd = 1 / sqrt(s3_0[index0,clus.m30]))
    
        s.m11<-qnorm(F[4], mean = beta1_10[index1,clus.m11] + beta1_11[index1,]%*%t(X.temp), sd = 1 / sqrt(s1_1[index1,clus.m11]))
        s.m21<-qnorm(F[5], mean = beta2_10[index1,clus.m21] + beta2_11[index1,]%*%t(X.temp), sd = 1 / sqrt(s2_1[index1,clus.m21]))
        s.m31<-qnorm(F[6], mean = beta3_10[index1,clus.m31] + beta3_11[index1,]%*%t(X.temp), sd = 1 / sqrt(s3_1[index1,clus.m31]))
    
        return(c(s.m10,s.m20,s.m30,s.m11,s.m21,s.m31))
    }

    #----- Sampling mediators
    C.sample <- sapply(seq(1,size, by=1), function(x) M0M1(x))
  
  
    M1 <- C.sample[4,] - C.sample[1,] # Causal effect on the 1st mediator
    M2 <- C.sample[5,] - C.sample[2,] # Causal effect on the 2nd mediator
    M3 <- C.sample[6,] - C.sample[3,] # Causal effect on the 3rd mediator

    
    return(rbind(M1,M2,M3))
}

result<-foreach(temp = 1:n.iter, .combine = rbind) %dopar% main(temp)

save(result, file="result_Mediators.RData")
stopCluster(cl)
